From f55d810dea78c9b8bfe8ac4cd39247d5d514b113 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juraci=20Paix=C3=A3o=20Kr=C3=B6hling?= Date: Thu, 5 Dec 2024 10:58:35 +0100 Subject: [PATCH 1/7] [chore][processor/probabilisticsampler] Document clarity regarding sampling_priority configuration (#36668) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes #30410 Signed-off-by: Juraci Paixão Kröhling Signed-off-by: Juraci Paixão Kröhling --- processor/probabilisticsamplerprocessor/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor/probabilisticsamplerprocessor/README.md b/processor/probabilisticsamplerprocessor/README.md index 86654c9265d9..e0059d9050e3 100644 --- a/processor/probabilisticsamplerprocessor/README.md +++ b/processor/probabilisticsamplerprocessor/README.md @@ -314,7 +314,7 @@ The following configuration options can be modified: - `attribute_source` (string, optional, default = "traceID"): defines where to look for the attribute in from_attribute. The allowed values are `traceID` or `record`. - `from_attribute` (string, optional, default = ""): The name of a log record attribute used for sampling purposes, such as a unique log record ID. The value of the attribute is only used if the trace ID is absent or if `attribute_source` is set to `record`. -- `sampling_priority` (string, optional, default = ""): The name of a log record attribute used to set a different sampling priority from the `sampling_percentage` setting. 0 means to never sample the log record, and >= 100 means to always sample the log record. +- `sampling_priority` (string, optional, default = ""): The name of a log record attribute used to set a different sampling priority from the `sampling_percentage` setting. The record attribute value's should be between 0 and 100, while 0 means to never sample the log record, and >= 100 means to always sample the log record. Examples: From b273ecef6cb423c2dbe1e219d9221dc1287e9bef Mon Sep 17 00:00:00 2001 From: Manu Agrawal Date: Thu, 5 Dec 2024 11:29:49 +0000 Subject: [PATCH 2/7] Skip flaky tests on windows for cloudspanner receiver (#36683) #### Description #### Link to tracking issue Part of https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/32397 #### Testing #### Documentation --- .../internal/filter/itemcardinality_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/receiver/googlecloudspannerreceiver/internal/filter/itemcardinality_test.go b/receiver/googlecloudspannerreceiver/internal/filter/itemcardinality_test.go index 13c257804715..ddd57306bc01 100644 --- a/receiver/googlecloudspannerreceiver/internal/filter/itemcardinality_test.go +++ b/receiver/googlecloudspannerreceiver/internal/filter/itemcardinality_test.go @@ -240,6 +240,9 @@ func TestItemCardinalityFilter_FilterItems(t *testing.T) { } func TestItemCardinalityFilter_IncludeItem(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("Skipping test on Windows due to https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/32397") + } timestamp := time.Now().UTC() item1 := &Item{SeriesKey: key1, Timestamp: timestamp} item2 := &Item{SeriesKey: key2, Timestamp: timestamp} From 4a465fd0e2c42cb9fe2b35e4ad1eefe6f1893c7c Mon Sep 17 00:00:00 2001 From: Daniel Jaglowski Date: Thu, 5 Dec 2024 11:22:17 -0500 Subject: [PATCH 3/7] [processor/routing] Deprecate processor (#36692) Resolves https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/36616 --- .chloggen/deprecate-routing-processor.yaml | 27 +++++++ connector/routingconnector/README.md | 12 +-- processor/routingprocessor/README.md | 76 ++++++++++++++++++- .../internal/metadata/generated_status.go | 6 +- processor/routingprocessor/metadata.yaml | 2 +- 5 files changed, 108 insertions(+), 15 deletions(-) create mode 100644 .chloggen/deprecate-routing-processor.yaml diff --git a/.chloggen/deprecate-routing-processor.yaml b/.chloggen/deprecate-routing-processor.yaml new file mode 100644 index 000000000000..411f91e17998 --- /dev/null +++ b/.chloggen/deprecate-routing-processor.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: deprecation + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: routingprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Deprecated in favor of the routing connector. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36616] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user, api] diff --git a/connector/routingconnector/README.md b/connector/routingconnector/README.md index 0198da0978ee..bfc4c26b2111 100644 --- a/connector/routingconnector/README.md +++ b/connector/routingconnector/README.md @@ -133,10 +133,10 @@ connectors: default_pipelines: [logs/other] table: - context: request - condition: reqeust["X-Tenant"] == "acme" + condition: request["X-Tenant"] == "acme" pipelines: [logs/acme] - context: request - condition: reqeust["X-Tenant"] == "ecorp" + condition: request["X-Tenant"] == "ecorp" pipelines: [logs/ecorp] service: @@ -263,10 +263,10 @@ connectors: condition: severity_number < SEVERITY_NUMBER_ERROR pipelines: [logs/cheap] - context: request - condition: reqeust["X-Tenant"] == "acme" + condition: request["X-Tenant"] == "acme" pipelines: [logs/acme] - context: request - condition: reqeust["X-Tenant"] == "ecorp" + condition: request["X-Tenant"] == "ecorp" pipelines: [logs/ecorp] service: @@ -285,10 +285,6 @@ service: exporters: [file/ecorp] ``` -## Differences between the Routing Connector and Routing Processor - -- The connector routes to pipelines, not exporters as the processor does. - [Connectors README]:https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md [OTTL]: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/README.md diff --git a/processor/routingprocessor/README.md b/processor/routingprocessor/README.md index 27e1793058f3..5ae2c563d5c7 100644 --- a/processor/routingprocessor/README.md +++ b/processor/routingprocessor/README.md @@ -3,15 +3,83 @@ | Status | | | ------------- |-----------| -| Stability | [beta]: traces, metrics, logs | +| Stability | [deprecated]: traces, metrics, logs | | Distributions | [contrib] | | Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aprocessor%2Frouting%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aprocessor%2Frouting) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aprocessor%2Frouting%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aprocessor%2Frouting) | | [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@jpkrohling](https://www.github.com/jpkrohling) | -[beta]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#beta +[deprecated]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#deprecated [contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib +## Deprecation Notice + +This processor has been deprecated in favor of the [`routing` connector][routing_connector]. + +### Migration + +The routing connector supports all features of the routing processor and more. However, the configuration is different. The general idea is the same, but there are a few key differences: + +- Rather than routing directly to exporters, the routing connector routes to pipelines. This allow for processors to be included after routing decisions. +- The connector is configured within the `connectors` section, rather than the `processors` section of the configuration. +- Usage of the connector in pipelines is different. You must use it as an exporter AND as a receiver in each pipeline to which it can route. +- Configuration is primarily based on [OTTL][OTTL]. +- Each route can be evaluated in a different [OTTL Context][ottl_contexts]. + +#### Example + +Starting from the example configuration below, we can achieve the same result with the routing connector: + +```yaml +processors: + routing: + from_attribute: X-Tenant + default_exporters: [jaeger] + table: + - value: acme + exporters: [jaeger/acme] +exporters: + jaeger: + endpoint: localhost:14250 + jaeger/acme: + endpoint: localhost:24250 +service: + pipelines: + traces: + receivers: [otlp] + processors: [routing] + exporters: [jaeger, jaeger/acme] +``` + +```yaml +connectors: + routing: + match_once: true + default_pipelines: [traces/jaeger] + table: + - context: request + condition: request["X-Tenant"] == "acme" + pipelines: [traces/jaeger/acme] +exporters: + jaeger: + endpoint: localhost:14250 + jaeger/acme: + endpoint: localhost:24250 +service: + pipelines: + traces: + receivers: [otlp] + exporters: [routing] + traces/jaeger: + receivers: [routing] + exporters: [jaeger] + traces/jaeger/acme: + receivers: [routing] + exporters: [jaeger/acme] +``` + +## Overview + Routes logs, metrics or traces to specific exporters. This processor will either read a header from the incoming HTTP request (gRPC or plain HTTP), or it will read a resource attribute, and direct the trace information to specific exporters based on the value read. @@ -114,4 +182,6 @@ The full list of settings exposed for this processor are documented [here](./con - [traces](./testdata/config_traces.yaml) [context_docs]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/context/README.md -[OTTL]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/processing.md#telemetry-query-language +[OTTL]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl#opentelemetry-transformation-language +[ottl_contexts]: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/contexts/README.md#opentelemetry-transformation-language-contexts +[routing_connector]: http://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/connector/routingconnector/README.md diff --git a/processor/routingprocessor/internal/metadata/generated_status.go b/processor/routingprocessor/internal/metadata/generated_status.go index 17d464b719d6..ba89f61c68c5 100644 --- a/processor/routingprocessor/internal/metadata/generated_status.go +++ b/processor/routingprocessor/internal/metadata/generated_status.go @@ -12,7 +12,7 @@ var ( ) const ( - TracesStability = component.StabilityLevelBeta - MetricsStability = component.StabilityLevelBeta - LogsStability = component.StabilityLevelBeta + TracesStability = component.StabilityLevelDeprecated + MetricsStability = component.StabilityLevelDeprecated + LogsStability = component.StabilityLevelDeprecated ) diff --git a/processor/routingprocessor/metadata.yaml b/processor/routingprocessor/metadata.yaml index c1a43f943292..9668e4fc217d 100644 --- a/processor/routingprocessor/metadata.yaml +++ b/processor/routingprocessor/metadata.yaml @@ -3,7 +3,7 @@ type: routing status: class: processor stability: - beta: [traces, metrics, logs] + deprecated: [traces, metrics, logs] distributions: [contrib] codeowners: active: [jpkrohling] From 3fdb51b378ddf6882ccf4f9c52723de6bd92ea0e Mon Sep 17 00:00:00 2001 From: "Mengyi Zhou (bjrara)" Date: Thu, 5 Dec 2024 09:18:28 -0800 Subject: [PATCH 4/7] [awsxrayexporter] Generate url section in xray segment when net.peer.name is available (#36530) #### Description Generate url section in X-Ray segment when net.peer.name is available #### Link to tracking issue Fixes https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/35375. #### Testing Unit test --- .../awsxrayexporter-fix-segment-gen.yaml | 27 +++++++++++++++++++ .../internal/translator/http.go | 1 + .../internal/translator/http_test.go | 1 + 3 files changed, 29 insertions(+) create mode 100644 .chloggen/awsxrayexporter-fix-segment-gen.yaml diff --git a/.chloggen/awsxrayexporter-fix-segment-gen.yaml b/.chloggen/awsxrayexporter-fix-segment-gen.yaml new file mode 100644 index 000000000000..77e4ad6ed1aa --- /dev/null +++ b/.chloggen/awsxrayexporter-fix-segment-gen.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: awsxrayexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Generate url section in xray segment when `net.peer.name` is available" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35375] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/awsxrayexporter/internal/translator/http.go b/exporter/awsxrayexporter/internal/translator/http.go index e761eae7e00f..b83ccb105b77 100644 --- a/exporter/awsxrayexporter/internal/translator/http.go +++ b/exporter/awsxrayexporter/internal/translator/http.go @@ -92,6 +92,7 @@ func makeHTTP(span ptrace.Span) (map[string]pcommon.Value, *awsxray.HTTPData) { hasHTTPRequestURLAttributes = true case conventions.AttributeNetPeerName: urlParts[key] = value.Str() + hasHTTPRequestURLAttributes = true case conventions.AttributeNetPeerPort: urlParts[key] = value.Str() if len(urlParts[key]) == 0 { diff --git a/exporter/awsxrayexporter/internal/translator/http_test.go b/exporter/awsxrayexporter/internal/translator/http_test.go index 92fb6b50bc30..4361c33e967e 100644 --- a/exporter/awsxrayexporter/internal/translator/http_test.go +++ b/exporter/awsxrayexporter/internal/translator/http_test.go @@ -88,6 +88,7 @@ func TestClientSpanWithPeerAttributes(t *testing.T) { assert.NotNil(t, httpData) assert.NotNil(t, filtered) + assert.NotNil(t, httpData.Request.URL) assert.Equal(t, "10.8.17.36", *httpData.Request.ClientIP) From 0e379754c822d5a8a973bef569cf89263c945760 Mon Sep 17 00:00:00 2001 From: Evan Bradley <11745660+evan-bradley@users.noreply.github.com> Date: Thu, 5 Dec 2024 12:27:14 -0500 Subject: [PATCH 5/7] [chore][pkg/ottl] Move new function guidelines to CONTRIBUTING.md (#36687) --- pkg/ottl/CONTRIBUTING.md | 48 ++++++++++++++++++++++++++++++------ pkg/ottl/ottlfuncs/README.md | 32 ------------------------ 2 files changed, 40 insertions(+), 40 deletions(-) diff --git a/pkg/ottl/CONTRIBUTING.md b/pkg/ottl/CONTRIBUTING.md index c644810d2c49..0bd2d24a7a82 100644 --- a/pkg/ottl/CONTRIBUTING.md +++ b/pkg/ottl/CONTRIBUTING.md @@ -6,20 +6,52 @@ This guide is specific to the OpenTelemetry Transformation Language. All guidel - Changes to the OpenTelemetry Transformation Language should be made independent of any component that depend on the package. Whenever possible, try not to submit PRs that change both the OTTL and a dependent component. Instead, submit a PR that updates the OTTL and then, once merged, update the component as needed. -## New Values +## Adding New Editors/Converters -When adding new values to the grammar you must: +Before raising a PR with a new Editor or Converter, raise an issue to verify its acceptance. While acceptance is strongly specific to a specific use case, consider these guidelines for early assessment. -1. Update the `Value` struct with the new value. This may also mean adding new token(s) to the lexer. -2. Update `NewFunctionCall` to be able to handle calling functions with this new value. -3. Update `NewGetter` to be able to handle the new value. -4. Add new unit tests. +Your proposal likely will be accepted if: + +- The proposed functionality is missing, +- The proposed solution significantly improves user experience and readability for very common use cases, +- The proposed solution is more performant in cases where it is possible to achieve the same result with existing options. + +It will be up for discussion if your proposal solves an issue that can be achieved in another way but does not improve user experience or performance. + +Your proposal likely won't be accepted if: + +- User experience is worse and assumes a highly technical user, +- The performance of your proposal very negatively affects the processing pipeline. + +As with code, OTTL aims for readability first. This means: -## New Functions +- Using short, meaningful, and descriptive names, +- Ensuring naming consistency across Editors and Converters, +- Avoiding deep nesting to achieve desired transformations, +- Ensuring Editors and Converters have a single responsibility. + +### Implementation guidelines All new functions must be added via a new file. Function files must start with `func_`. Functions must be placed in `ottlfuncs`. Unit tests must be added for all new functions. Unit test files must start with `func_` and end in `_test`. Unit tests must be placed in the same directory as the function. Functions that are not specific to a pipeline should be tested independently of any specific pipeline. Functions that are specific to a pipeline should be tests against that pipeline. End-to-end tests must be added in the `e2e` directory. -Function names should follow the [Function Syntax Guidelines](ottlfuncs/README.md#function-syntax) +#### Naming and Parameter Guidelines + +Functions should be named and formatted according to the following standards. + +- Function names MUST start with a verb unless it is a Factory that creates a new type. +- Converters MUST be UpperCamelCase. +- Function names that contain multiple words MUST separate those words with `_`. +- Functions that interact with multiple items MUST have plurality in the name. Ex: `truncate_all`, `keep_keys`, `replace_all_matches`. +- Functions that interact with a single item MUST NOT have plurality in the name. If a function would interact with multiple items due to a condition, like `where`, it is still considered singular. Ex: `set`, `delete`, `replace_match`. +- Functions that change a specific target MUST set the target as the first parameter. +## New Values + +When adding new values to the grammar you must: + +1. Update the `Value` struct with the new value. This may also mean adding new token(s) to the lexer. +2. Update `NewFunctionCall` to be able to handle calling functions with this new value. +3. Update `NewGetter` to be able to handle the new value. +4. Add new unit tests. diff --git a/pkg/ottl/ottlfuncs/README.md b/pkg/ottl/ottlfuncs/README.md index a1ef094b4265..8272d0d19d8c 100644 --- a/pkg/ottl/ottlfuncs/README.md +++ b/pkg/ottl/ottlfuncs/README.md @@ -2127,35 +2127,3 @@ The returned type is `int64`. Examples: - `Year(Now())` - -## Function syntax - -Functions should be named and formatted according to the following standards. - -- Function names MUST start with a verb unless it is a Factory that creates a new type. -- Converters MUST be UpperCamelCase. -- Function names that contain multiple words MUST separate those words with `_`. -- Functions that interact with multiple items MUST have plurality in the name. Ex: `truncate_all`, `keep_keys`, `replace_all_matches`. -- Functions that interact with a single item MUST NOT have plurality in the name. If a function would interact with multiple items due to a condition, like `where`, it is still considered singular. Ex: `set`, `delete`, `replace_match`. -- Functions that change a specific target MUST set the target as the first parameter. - -## Adding New Editors/Converters - -Before raising a PR with a new Editor or Converter, raise an issue to verify its acceptance. While acceptance is strongly specific to a specific use case, consider these guidelines for early assessment. - -Your proposal likely will be accepted if: -- The proposed functionality is missing, -- The proposed solution significantly improves user experience and readability for very common use cases, -- The proposed solution is more performant in cases where it is possible to achieve the same result with existing options. - -It will be up for discussion if your proposal solves an issue that can be achieved in another way but does not improve user experience or performance. - -Your proposal likely won't be accepted if: -- User experience is worse and assumes a highly technical user, -- The performance of your proposal very negatively affects the processing pipeline. - -As with code, OTTL aims for readability first. This means: -- Using short, meaningful, and descriptive names, -- Ensuring naming consistency across Editors and Converters, -- Avoiding deep nesting to achieve desired transformations, -- Ensuring Editors and Converters have a single responsibility. From ad7f371f1e632f166cb1e7acbad3b31fc6bff12d Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Thu, 5 Dec 2024 18:55:44 +0100 Subject: [PATCH 6/7] [processor/k8sattributes]: log error encountered during kube client initialisation (#36385) #### Description This PR adds more log output to the k8s attributes receiver to log any errors that are encountered during the kube client initialisation, to make troubleshooting and identifying this issue easier. #### Link to tracking issue Fixes #35879 --------- Signed-off-by: Florian Bacher --- .../k8sattributes-k8s-client-init-log.yaml | 29 +++++++++++++++++++ processor/k8sattributesprocessor/processor.go | 2 ++ 2 files changed, 31 insertions(+) create mode 100644 .chloggen/k8sattributes-k8s-client-init-log.yaml diff --git a/.chloggen/k8sattributes-k8s-client-init-log.yaml b/.chloggen/k8sattributes-k8s-client-init-log.yaml new file mode 100644 index 000000000000..f1c67ff28319 --- /dev/null +++ b/.chloggen/k8sattributes-k8s-client-init-log.yaml @@ -0,0 +1,29 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: k8sattributesreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Log any errors encountered during kube client initialisation + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35879] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + This addresses an issue where the collector, due to an error encountered during the kubernetes client initialisation, + was reporting an 'unavailable' status via the health check extension without any further information to be found in the logs. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/processor/k8sattributesprocessor/processor.go b/processor/k8sattributesprocessor/processor.go index 98499f5e5473..9fa753f95fe7 100644 --- a/processor/k8sattributesprocessor/processor.go +++ b/processor/k8sattributesprocessor/processor.go @@ -62,6 +62,7 @@ func (kp *kubernetesprocessor) Start(_ context.Context, host component.Host) err for _, opt := range allOptions { if err := opt(kp); err != nil { + kp.logger.Error("Could not apply option", zap.Error(err)) componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err)) return err } @@ -71,6 +72,7 @@ func (kp *kubernetesprocessor) Start(_ context.Context, host component.Host) err if kp.kc == nil { err := kp.initKubeClient(kp.telemetrySettings, kubeClientProvider) if err != nil { + kp.logger.Error("Could not initialize kube client", zap.Error(err)) componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err)) return err } From 396c63d0ad4f696d694ff09e40b0abc4cceb8f15 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Thu, 5 Dec 2024 19:20:40 +0100 Subject: [PATCH 7/7] [chore] remove converter type from stanza (#36288) #### Description This PR removes the `Converter` type that was previously used mainly by the stanza receiver adapter (see https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/35669#issuecomment-2443455752 for more details). Two other receivers were still using the converter to generate test data within the unit tests, so those have been adapted as well with this PR #### Link to tracking issue Follow up to https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/35453 #### Testing Adapted unit tests that were still using the converter --------- Signed-off-by: Florian Bacher --- pkg/stanza/adapter/converter.go | 184 ---------------- pkg/stanza/adapter/converter_test.go | 219 ++----------------- receiver/filelogreceiver/filelog_test.go | 77 +++---- receiver/filelogreceiver/go.mod | 2 + receiver/namedpipereceiver/namedpipe_test.go | 4 - 5 files changed, 46 insertions(+), 440 deletions(-) diff --git a/pkg/stanza/adapter/converter.go b/pkg/stanza/adapter/converter.go index 3ab508745bc3..a81fd8f00a42 100644 --- a/pkg/stanza/adapter/converter.go +++ b/pkg/stanza/adapter/converter.go @@ -4,162 +4,19 @@ package adapter // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter" import ( - "context" "encoding/binary" "encoding/json" - "errors" "fmt" - "math" - "runtime" "sort" "sync" "github.com/cespare/xxhash/v2" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" - "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" ) -// Converter converts a batch of entry.Entry into plog.Logs aggregating translated -// entries into logs coming from the same Resource. -// -// The diagram below illustrates the internal communication inside the Converter: -// -// ┌─────────────────────────────────┐ -// │ Batch() │ -// ┌─────────┤ Ingests batches of log entries │ -// │ │ and sends them onto workerChan │ -// │ └─────────────────────────────────┘ -// │ -// │ ┌───────────────────────────────────────────────────┐ -// ├─► workerLoop() │ -// │ │ ┌─────────────────────────────────────────────────┴─┐ -// ├─┼─► workerLoop() │ -// │ │ │ ┌─────────────────────────────────────────────────┴─┐ -// └─┼─┼─► workerLoop() │ -// └─┤ │ consumes sent log entries from workerChan, │ -// │ │ translates received entries to plog.LogRecords, │ -// └─┤ and sends them on flushChan │ -// └─────────────────────────┬─────────────────────────┘ -// │ -// ▼ -// ┌─────────────────────────────────────────────────────┐ -// │ flushLoop() │ -// │ receives log records from flushChan and sends │ -// │ them onto pLogsChan which is consumed by │ -// │ downstream consumers via OutChannel() │ -// └─────────────────────────────────────────────────────┘ -type Converter struct { - set component.TelemetrySettings - - // pLogsChan is a channel on which aggregated logs will be sent to. - pLogsChan chan plog.Logs - - stopOnce sync.Once - - // converterChan is an internal communication channel signaling stop was called - // prevents sending to closed channels - converterChan chan struct{} - - // workerChan is an internal communication channel that gets the log - // entries from Batch() calls and it receives the data in workerLoop(). - workerChan chan []*entry.Entry - // workerCount configures the amount of workers started. - workerCount int - - // flushChan is an internal channel used for transporting batched plog.Logs. - flushChan chan plog.Logs - - // wg is a WaitGroup that makes sure that we wait for spun up goroutines exit - // when Stop() is called. - wg sync.WaitGroup - - // flushWg is a WaitGroup that makes sure that we wait for flush loop to exit - // when Stop() is called. - flushWg sync.WaitGroup -} - -type converterOption interface { - apply(*Converter) -} - -func withWorkerCount(workerCount int) converterOption { - return workerCountOption{workerCount} -} - -type workerCountOption struct { - workerCount int -} - -func (o workerCountOption) apply(c *Converter) { - c.workerCount = o.workerCount -} - -func NewConverter(set component.TelemetrySettings, opts ...converterOption) *Converter { - set.Logger = set.Logger.With(zap.String("component", "converter")) - c := &Converter{ - set: set, - workerChan: make(chan []*entry.Entry), - workerCount: int(math.Max(1, float64(runtime.NumCPU()/4))), - pLogsChan: make(chan plog.Logs), - converterChan: make(chan struct{}), - flushChan: make(chan plog.Logs), - } - for _, opt := range opts { - opt.apply(c) - } - return c -} - -func (c *Converter) Start() { - c.set.Logger.Debug("Starting log converter", zap.Int("worker_count", c.workerCount)) - - c.wg.Add(c.workerCount) - for i := 0; i < c.workerCount; i++ { - go c.workerLoop() - } - - c.flushWg.Add(1) - go c.flushLoop() -} - -func (c *Converter) Stop() { - c.stopOnce.Do(func() { - close(c.converterChan) - - // close workerChan and wait for entries to be processed - close(c.workerChan) - c.wg.Wait() - - // close flushChan and wait for flush loop to finish - close(c.flushChan) - c.flushWg.Wait() - - // close pLogsChan so callers can stop processing - close(c.pLogsChan) - }) -} - -// OutChannel returns the channel on which converted entries will be sent to. -func (c *Converter) OutChannel() <-chan plog.Logs { - return c.pLogsChan -} - -// workerLoop is responsible for obtaining log entries from Batch() calls, -// converting them to plog.LogRecords batched by Resource, and sending them -// on flushChan. -func (c *Converter) workerLoop() { - defer c.wg.Done() - - for entries := range c.workerChan { - // Send plogs directly to flushChan - c.flushChan <- ConvertEntries(entries) - } -} - func ConvertEntries(entries []*entry.Entry) plog.Logs { resourceHashToIdx := make(map[uint64]int) scopeIdxByResource := make(map[uint64]map[string]int) @@ -197,47 +54,6 @@ func ConvertEntries(entries []*entry.Entry) plog.Logs { return pLogs } -func (c *Converter) flushLoop() { - defer c.flushWg.Done() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - for pLogs := range c.flushChan { - if err := c.flush(ctx, pLogs); err != nil { - c.set.Logger.Debug("Problem sending log entries", - zap.Error(err), - ) - } - } -} - -// flush flushes provided plog.Logs entries onto a channel. -func (c *Converter) flush(ctx context.Context, pLogs plog.Logs) error { - doneChan := ctx.Done() - - select { - case <-doneChan: - return fmt.Errorf("flushing log entries interrupted, err: %w", ctx.Err()) - - case c.pLogsChan <- pLogs: - } - - return nil -} - -// Batch takes in an entry.Entry and sends it to an available worker for processing. -func (c *Converter) Batch(e []*entry.Entry) error { - // in case Stop was called do not process batch - select { - case <-c.converterChan: - return errors.New("logs converter has been stopped") - default: - } - - c.workerChan <- e - return nil -} - // convert converts one entry.Entry into plog.LogRecord allocating it. func convert(ent *entry.Entry) plog.LogRecord { dest := plog.NewLogRecord() diff --git a/pkg/stanza/adapter/converter_test.go b/pkg/stanza/adapter/converter_test.go index a56a21f94eb2..a527e3bddb84 100644 --- a/pkg/stanza/adapter/converter_test.go +++ b/pkg/stanza/adapter/converter_test.go @@ -4,20 +4,15 @@ package adapter import ( - "context" "fmt" "sort" "strconv" - "sync" "testing" "time" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" - "go.uber.org/zap/zaptest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" ) @@ -42,10 +37,6 @@ func BenchmarkConvertComplex(b *testing.B) { } } -func complexEntries(count int) []*entry.Entry { - return complexEntriesForNDifferentHosts(count, 1) -} - func complexEntriesForNDifferentHosts(count int, n int) []*entry.Entry { ret := make([]*entry.Entry, count) for i := 0; i < count; i++ { @@ -392,163 +383,25 @@ func TestAllConvertedEntriesScopeGrouping(t *testing.T) { t.Run(strconv.Itoa(i), func(t *testing.T) { t.Parallel() - set := componenttest.NewNopTelemetrySettings() - set.Logger = zaptest.NewLogger(t) - converter := NewConverter(set) - converter.Start() - defer converter.Stop() - - go func() { - entries := complexEntriesForNDifferentHostsMDifferentScopes(100, 1, tc.numberOFScopes) - assert.NoError(t, converter.Batch(entries)) - }() - - var ( - timeoutTimer = time.NewTimer(10 * time.Second) - ch = converter.OutChannel() - ) - defer timeoutTimer.Stop() - - select { - case pLogs, ok := <-ch: - if !ok { - break - } + entries := complexEntriesForNDifferentHostsMDifferentScopes(100, 1, tc.numberOFScopes) - rLogs := pLogs.ResourceLogs() - rLog := rLogs.At(0) + pLogs := ConvertEntries(entries) - ills := rLog.ScopeLogs() - require.Equal(t, ills.Len(), tc.numberOFScopes) + rLogs := pLogs.ResourceLogs() + rLog := rLogs.At(0) - for i := 0; i < tc.numberOFScopes; i++ { - sl := ills.At(i) - require.Equal(t, sl.Scope().Name(), fmt.Sprintf("scope-%d", i%tc.numberOFScopes)) - require.Equal(t, sl.LogRecords().Len(), tc.logsPerScope) - } + ills := rLog.ScopeLogs() + require.Equal(t, ills.Len(), tc.numberOFScopes) - case <-timeoutTimer.C: - break + for i := 0; i < tc.numberOFScopes; i++ { + sl := ills.At(i) + require.Equal(t, sl.Scope().Name(), fmt.Sprintf("scope-%d", i%tc.numberOFScopes)) + require.Equal(t, sl.LogRecords().Len(), tc.logsPerScope) } }) } } -func TestAllConvertedEntriesAreSentAndReceived(t *testing.T) { - t.Parallel() - - testcases := []struct { - entries int - maxFlushCount uint - }{ - { - entries: 10, - maxFlushCount: 10, - }, - { - entries: 10, - maxFlushCount: 3, - }, - { - entries: 100, - maxFlushCount: 20, - }, - } - - for i, tc := range testcases { - tc := tc - - t.Run(strconv.Itoa(i), func(t *testing.T) { - t.Parallel() - - set := componenttest.NewNopTelemetrySettings() - set.Logger = zaptest.NewLogger(t) - converter := NewConverter(set) - converter.Start() - defer converter.Stop() - - go func() { - entries := complexEntries(tc.entries) - for from := 0; from < tc.entries; from += int(tc.maxFlushCount) { - to := from + int(tc.maxFlushCount) - if to > tc.entries { - to = tc.entries - } - assert.NoError(t, converter.Batch(entries[from:to])) - } - }() - - var ( - actualCount int - timeoutTimer = time.NewTimer(10 * time.Second) - ch = converter.OutChannel() - ) - defer timeoutTimer.Stop() - - forLoop: - for { - if tc.entries == actualCount { - break - } - - select { - case pLogs, ok := <-ch: - if !ok { - break forLoop - } - - rLogs := pLogs.ResourceLogs() - require.Equal(t, 1, rLogs.Len()) - - rLog := rLogs.At(0) - ills := rLog.ScopeLogs() - require.Equal(t, 1, ills.Len()) - - sl := ills.At(0) - - actualCount += sl.LogRecords().Len() - - assert.LessOrEqual(t, uint(sl.LogRecords().Len()), tc.maxFlushCount, - "Received more log records in one flush than configured by maxFlushCount", - ) - - case <-timeoutTimer.C: - break forLoop - } - } - - assert.Equal(t, tc.entries, actualCount, - "didn't receive expected number of entries after conversion", - ) - }) - } -} - -func TestConverterCancelledContextCancellsTheFlush(t *testing.T) { - set := componenttest.NewNopTelemetrySettings() - set.Logger = zaptest.NewLogger(t) - converter := NewConverter(set) - converter.Start() - defer converter.Stop() - var wg sync.WaitGroup - wg.Add(1) - - ctx, cancel := context.WithCancel(context.Background()) - cancel() - - go func() { - defer wg.Done() - pLogs := plog.NewLogs() - ills := pLogs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty() - - lr := convert(complexEntry()) - lr.CopyTo(ills.LogRecords().AppendEmpty()) - - assert.Error(t, converter.flush(ctx, pLogs)) - }() - wg.Wait() -} - func TestConvertMetadata(t *testing.T) { now := time.Now() @@ -946,55 +799,17 @@ func BenchmarkConverter(b *testing.B) { for _, wc := range workerCounts { b.Run(fmt.Sprintf("worker_count=%d", wc), func(b *testing.B) { for i := 0; i < b.N; i++ { - set := componenttest.NewNopTelemetrySettings() - set.Logger = zaptest.NewLogger(b) - converter := NewConverter(set, withWorkerCount(wc)) - converter.Start() - defer converter.Stop() - b.ReportAllocs() - go func() { - for from := 0; from < entryCount; from += int(batchSize) { - to := from + int(batchSize) - if to > entryCount { - to = entryCount - } - assert.NoError(b, converter.Batch(entries[from:to])) - } - }() - - var ( - timeoutTimer = time.NewTimer(10 * time.Second) - ch = converter.OutChannel() - ) - defer timeoutTimer.Stop() - - var n int - forLoop: - for { - if n == entryCount { - break - } - - select { - case pLogs, ok := <-ch: - if !ok { - break forLoop - } - - rLogs := pLogs.ResourceLogs() - require.Equal(b, hostsCount, rLogs.Len()) - n += pLogs.LogRecordCount() - - case <-timeoutTimer.C: - break forLoop + for from := 0; from < entryCount; from += int(batchSize) { + to := from + int(batchSize) + if to > entryCount { + to = entryCount } + pLogs := ConvertEntries(entries[from:to]) + rLogs := pLogs.ResourceLogs() + require.Equal(b, hostsCount, rLogs.Len()) } - - assert.Equal(b, entryCount, n, - "didn't receive expected number of entries after conversion", - ) } }) } diff --git a/receiver/filelogreceiver/filelog_test.go b/receiver/filelogreceiver/filelog_test.go index 6d9cb8f7f577..a49a7d780286 100644 --- a/receiver/filelogreceiver/filelog_test.go +++ b/receiver/filelogreceiver/filelog_test.go @@ -12,7 +12,6 @@ import ( "path/filepath" "runtime" "strconv" - "sync" "sync/atomic" "testing" "time" @@ -28,6 +27,7 @@ import ( "go.opentelemetry.io/collector/receiver/receivertest" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/consumerretry" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" @@ -83,44 +83,50 @@ func TestReadStaticFile(t *testing.T) { sink := new(consumertest.LogsSink) cfg := testdataConfigYaml() - converter := adapter.NewConverter(componenttest.NewNopTelemetrySettings()) - converter.Start() - defer converter.Stop() - - var wg sync.WaitGroup - wg.Add(1) - go consumeNLogsFromConverter(converter.OutChannel(), 3, &wg) - rcvr, err := f.CreateLogs(context.Background(), receivertest.NewNopSettings(), cfg, sink) require.NoError(t, err, "failed to create receiver") require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost())) + expectedLogs := []plog.Logs{} // Build the expected set by using adapter.Converter to translate entries // to pdata Logs. - queueEntry := func(t *testing.T, c *adapter.Converter, msg string, severity entry.Severity) { + entries := []*entry.Entry{} + queueEntry := func(msg string, severity entry.Severity) { e := entry.New() e.Timestamp = expectedTimestamp - require.NoError(t, e.Set(entry.NewBodyField("msg"), msg)) + e.Body = fmt.Sprintf("2020-08-25 %s %s", severity.String(), msg) e.Severity = severity - e.AddAttribute("file_name", "simple.log") - require.NoError(t, c.Batch([]*entry.Entry{e})) + e.AddAttribute("log.file.name", "simple.log") + e.AddAttribute("time", "2020-08-25") + e.AddAttribute("sev", severity.String()) + e.AddAttribute("msg", msg) + entries = append(entries, e) } - queueEntry(t, converter, "Something routine", entry.Info) - queueEntry(t, converter, "Something bad happened!", entry.Error) - queueEntry(t, converter, "Some details...", entry.Debug) + queueEntry("Something routine", entry.Info) + queueEntry("Something bad happened!", entry.Error) + queueEntry("Some details...", entry.Debug) + + expectedLogs = append(expectedLogs, adapter.ConvertEntries(entries)) dir, err := os.Getwd() require.NoError(t, err) t.Logf("Working Directory: %s", dir) - wg.Wait() - require.Eventually(t, expectNLogs(sink, 3), 2*time.Second, 5*time.Millisecond, "expected %d but got %d logs", 3, sink.LogRecordCount(), ) - // TODO: Figure out a nice way to assert each logs entry content. - // require.Equal(t, expectedLogs, sink.AllLogs()) + + for i, expectedLog := range expectedLogs { + require.NoError(t, + plogtest.CompareLogs( + expectedLog, + sink.AllLogs()[i], + plogtest.IgnoreObservedTimestamp(), + plogtest.IgnoreTimestamp(), + ), + ) + } require.NoError(t, rcvr.Shutdown(context.Background())) } @@ -168,15 +174,6 @@ func (rt *rotationTest) Run(t *testing.T) { fileName := filepath.Join(tempDir, "test.log") backupFileName := filepath.Join(tempDir, "test-backup.log") - // Build expected outputs - expectedTimestamp, _ := time.ParseInLocation("2006-01-02", "2020-08-25", time.Local) - converter := adapter.NewConverter(componenttest.NewNopTelemetrySettings()) - converter.Start() - - var wg sync.WaitGroup - wg.Add(1) - go consumeNLogsFromConverter(converter.OutChannel(), numLogs, &wg) - rcvr, err := f.CreateLogs(context.Background(), receivertest.NewNopSettings(), cfg, sink) require.NoError(t, err, "failed to create receiver") require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost())) @@ -224,40 +221,20 @@ func (rt *rotationTest) Run(t *testing.T) { msg := fmt.Sprintf("This is a simple log line with the number %3d", i) - // Build the expected set by converting entries to pdata Logs... - e := entry.New() - e.Timestamp = expectedTimestamp - require.NoError(t, e.Set(entry.NewBodyField("msg"), msg)) - require.NoError(t, converter.Batch([]*entry.Entry{e})) - // ... and write the logs lines to the actual file consumed by receiver. _, err := file.WriteString(fmt.Sprintf("2020-08-25 %s\n", msg)) require.NoError(t, err) time.Sleep(time.Millisecond) } - wg.Wait() require.Eventually(t, expectNLogs(sink, numLogs), 2*time.Second, 10*time.Millisecond, "expected %d but got %d logs", numLogs, sink.LogRecordCount(), ) + // TODO: Figure out a nice way to assert each logs entry content. // require.Equal(t, expectedLogs, sink.AllLogs()) require.NoError(t, rcvr.Shutdown(context.Background())) - converter.Stop() -} - -func consumeNLogsFromConverter(ch <-chan plog.Logs, count int, wg *sync.WaitGroup) { - defer wg.Done() - - n := 0 - for pLog := range ch { - n += pLog.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().Len() - - if n == count { - return - } - } } func expectNLogs(sink *consumertest.LogsSink, expected int) func() bool { diff --git a/receiver/filelogreceiver/go.mod b/receiver/filelogreceiver/go.mod index 0b83f6d48fb5..43e705cd1e03 100644 --- a/receiver/filelogreceiver/go.mod +++ b/receiver/filelogreceiver/go.mod @@ -19,6 +19,7 @@ require ( ) require ( + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.115.0 go.opentelemetry.io/collector/component/componenttest v0.115.0 go.opentelemetry.io/collector/consumer/consumertest v0.115.0 go.opentelemetry.io/collector/pipeline v0.115.0 @@ -51,6 +52,7 @@ require ( github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.115.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/valyala/fastjson v1.6.4 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.115.0 // indirect diff --git a/receiver/namedpipereceiver/namedpipe_test.go b/receiver/namedpipereceiver/namedpipe_test.go index c7a4f25bea80..a13bc83e28c6 100644 --- a/receiver/namedpipereceiver/namedpipe_test.go +++ b/receiver/namedpipereceiver/namedpipe_test.go @@ -55,10 +55,6 @@ func TestReadPipe(t *testing.T) { sink := new(consumertest.LogsSink) cfg := testdataConfigYaml() - converter := adapter.NewConverter(componenttest.NewNopTelemetrySettings()) - converter.Start() - defer converter.Stop() - rcvr, err := f.CreateLogs(context.Background(), receivertest.NewNopSettings(), cfg, sink) require.NoError(t, err, "failed to create receiver") require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost()))