Skip to content

Commit

Permalink
[breaking][pkg/stanza] Pass TelemetrySettings to Build of Builder int…
Browse files Browse the repository at this point in the history
…erface (open-telemetry#32662)

**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
This PR resumes the work from
open-telemetry#31618.

The goal is to pass in the `component.TelemetrySettings` so as to use
them later in various ways:
1) report the filelog's state stats:
open-telemetry#31544
2) switch from `SugaredLogger` to `Logger`:
open-telemetry#32177


More about the breaking change decision at
open-telemetry#31618 (comment).

**Link to tracking Issue:** <Issue number if applicable>
open-telemetry#31256

**Testing:** <Describe what testing was performed and which tests were
added.> Testing suite got updated.

#### Manual Testing

1.
```yaml
receivers:
  filelog:
    start_at: end
    include:
    - /var/log/busybox/refactroring_test.log
    operators:
      - id: addon
        type: add
        field: attributes.extra
        value: my-val
exporters:
  debug:
    verbosity: detailed
service:
  pipelines:
    logs:
      receivers: [filelog]
      exporters: [debug]
      processors: []
```
2. `./bin/otelcontribcol_linux_amd64 --config ~/otelcol/config.yaml`
3. `echo 'some line' >> /var/log/busybox/refactroring_test.log`
4.
```console
2024-04-24T09:29:17.104+0300	info	LogsExporter	{"kind": "exporter", "data_type": "logs", "name": "debug", "resource logs": 1, "log records": 1}
2024-04-24T09:29:17.104+0300	info	ResourceLog #0
Resource SchemaURL: 
ScopeLogs #0
ScopeLogs SchemaURL: 
InstrumentationScope  
LogRecord #0
ObservedTimestamp: 2024-04-24 06:29:17.005433317 +0000 UTC
Timestamp: 1970-01-01 00:00:00 +0000 UTC
SeverityText: 
SeverityNumber: Unspecified(0)
Body: Str(some line)
Attributes:
     -> extra: Str(my-val)
     -> log.file.name: Str(1.log)
Trace ID: 
Span ID: 
Flags: 0
	{"kind": "exporter", "data_type": "logs", "name": "debug"}
```

**Documentation:** <Describe the documentation added.> TBA.

Signed-off-by: ChrsMark <[email protected]>
  • Loading branch information
ChrsMark authored Apr 24, 2024
1 parent 9a5240e commit 6cefabc
Show file tree
Hide file tree
Showing 110 changed files with 564 additions and 337 deletions.
31 changes: 31 additions & 0 deletions .chloggen/pass_telemetry_settings_to_stanza_ops.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Pass TelemetrySettings to the Build method of the Builder interface

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32662, 31256]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
The reason for this breaking change is to pass in the component.TelemetrySettings
so as to use them later in various ways:
- be able to report state statistics and telemetry in general
- be able to switch from SugaredLogger to Logger
# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
5 changes: 2 additions & 3 deletions pkg/stanza/adapter/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/receiver/receivertest"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
Expand Down Expand Up @@ -164,8 +163,8 @@ type BenchOpConfig struct {
}

// Build will build a noop operator.
func (c BenchOpConfig) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
inputOperator, err := c.InputConfig.Build(logger)
func (c BenchOpConfig) Build(set *component.TelemetrySettings) (operator.Operator, error) {
inputOperator, err := c.InputConfig.Build(set)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/stanza/adapter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func createLogsReceiver(logReceiverType LogReceiverType) rcvr.CreateLogsFunc {
pipe, err := pipeline.Config{
Operators: operators,
DefaultOutput: emitter,
}.Build(params.Logger.Sugar())
}.Build(&params.TelemetrySettings)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/stanza/adapter/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ import (
func createNoopReceiver(nextConsumer consumer.Logs) (*receiver, error) {
emitter := helper.NewLogEmitter(zap.NewNop().Sugar())

set := componenttest.NewNopTelemetrySettings()
pipe, err := pipeline.Config{
Operators: []operator.Config{
{
Builder: noop.NewConfig(),
},
},
}.Build(zap.NewNop().Sugar())
}.Build(&set)
if err != nil {
return nil, err
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/stanza/adapter/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"errors"

"go.opentelemetry.io/collector/component"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
Expand Down Expand Up @@ -41,8 +40,8 @@ func NewUnstartableConfig() operator.Config {
}

// Build will build an unstartable operator
func (c *UnstartableConfig) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
o, _ := c.OutputConfig.Build(logger)
func (c *UnstartableConfig) Build(set *component.TelemetrySettings) (operator.Operator, error) {
o, _ := c.OutputConfig.Build(set)
return &UnstartableOperator{OutputOperator: o}, nil
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/stanza/adapter/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,11 @@ func BenchmarkReadLine(b *testing.B) {
require.NoError(b, emitter.Stop())
}()

set := componenttest.NewNopTelemetrySettings()
pipe, err := pipeline.Config{
Operators: operatorCfgs,
DefaultOutput: emitter,
}.Build(zap.NewNop().Sugar())
}.Build(&set)
require.NoError(b, err)

// Populate the file that will be consumed
Expand Down Expand Up @@ -205,10 +206,11 @@ func BenchmarkParseAndMap(b *testing.B) {
require.NoError(b, emitter.Stop())
}()

set := componenttest.NewNopTelemetrySettings()
pipe, err := pipeline.Config{
Operators: operatorCfgs,
DefaultOutput: emitter,
}.Build(zap.NewNop().Sugar())
}.Build(&set)
require.NoError(b, err)

// Populate the file that will be consumed
Expand Down
4 changes: 3 additions & 1 deletion pkg/stanza/fileconsumer/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/filetest"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
Expand Down Expand Up @@ -191,7 +192,8 @@ func BenchmarkFileInput(b *testing.B) {
}
return nil
}
op, err := cfg.Build(testutil.Logger(b), callback)
set := componenttest.NewNopTelemetrySettings()
op, err := cfg.Build(&set, callback)
require.NoError(b, err)

b.ResetTimer()
Expand Down
16 changes: 8 additions & 8 deletions pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
"runtime"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/featuregate"
"go.uber.org/zap"
"golang.org/x/text/encoding"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode"
Expand Down Expand Up @@ -89,11 +89,11 @@ type HeaderConfig struct {
}

// Deprecated [v0.97.0] Use Build and WithSplitFunc option instead
func (c Config) BuildWithSplitFunc(logger *zap.SugaredLogger, emit emit.Callback, splitFunc bufio.SplitFunc) (*Manager, error) {
return c.Build(logger, emit, WithSplitFunc(splitFunc))
func (c Config) BuildWithSplitFunc(set *component.TelemetrySettings, emit emit.Callback, splitFunc bufio.SplitFunc) (*Manager, error) {
return c.Build(set, emit, WithSplitFunc(splitFunc))
}

func (c Config) Build(logger *zap.SugaredLogger, emit emit.Callback, opts ...Option) (*Manager, error) {
func (c Config) Build(set *component.TelemetrySettings, emit emit.Callback, opts ...Option) (*Manager, error) {
if err := c.validate(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -148,7 +148,7 @@ func (c Config) Build(logger *zap.SugaredLogger, emit emit.Callback, opts ...Opt
}

readerFactory := reader.Factory{
SugaredLogger: logger.With("component", "fileconsumer"),
SugaredLogger: set.Logger.Sugar().With("component", "fileconsumer"),
FromBeginning: startAtBeginning,
FingerprintSize: int(c.FingerprintSize),
InitialBufferSize: scanner.DefaultBufferSize,
Expand All @@ -165,12 +165,12 @@ func (c Config) Build(logger *zap.SugaredLogger, emit emit.Callback, opts ...Opt

var t tracker.Tracker
if o.noTracking {
t = tracker.NewNoStateTracker(logger.With("component", "fileconsumer"), c.MaxConcurrentFiles/2)
t = tracker.NewNoStateTracker(set.Logger.Sugar().With("component", "fileconsumer"), c.MaxConcurrentFiles/2)
} else {
t = tracker.NewFileTracker(logger.With("component", "fileconsumer"), c.MaxConcurrentFiles/2)
t = tracker.NewFileTracker(set.Logger.Sugar().With("component", "fileconsumer"), c.MaxConcurrentFiles/2)
}
return &Manager{
SugaredLogger: logger.With("component", "fileconsumer"),
SugaredLogger: set.Logger.Sugar().With("component", "fileconsumer"),
readerFactory: readerFactory,
fileMatcher: fileMatcher,
pollInterval: c.PollInterval,
Expand Down
15 changes: 9 additions & 6 deletions pkg/stanza/fileconsumer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/featuregate"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/emittest"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
Expand All @@ -22,7 +23,6 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/operatortest"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/regex"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil"
)

func TestNewConfig(t *testing.T) {
Expand Down Expand Up @@ -629,7 +629,8 @@ func TestBuild(t *testing.T) {
cfg := basicConfig()
tc.modifyBaseConfig(cfg)

input, err := cfg.Build(testutil.Logger(t), emittest.Nop)
set := componenttest.NewNopTelemetrySettings()
input, err := cfg.Build(&set, emittest.Nop)
tc.errorRequirement(t, err)
if err != nil {
return
Expand Down Expand Up @@ -708,7 +709,8 @@ func TestBuildWithSplitFunc(t *testing.T) {
return len(data), data, nil
}

input, err := cfg.BuildWithSplitFunc(testutil.Logger(t), emittest.Nop, splitNone)
set := componenttest.NewNopTelemetrySettings()
input, err := cfg.BuildWithSplitFunc(&set, emittest.Nop, splitNone)
tc.errorRequirement(t, err)
if err != nil {
return
Expand Down Expand Up @@ -795,7 +797,8 @@ func TestBuildWithHeader(t *testing.T) {
cfg := basicConfig()
tc.modifyBaseConfig(cfg)

input, err := cfg.Build(testutil.Logger(t), emittest.Nop)
set := componenttest.NewNopTelemetrySettings()
input, err := cfg.Build(&set, emittest.Nop)
tc.errorRequirement(t, err)
if err != nil {
return
Expand Down Expand Up @@ -848,6 +851,6 @@ func newMockOperatorConfig(cfg *Config) *mockOperatorConfig {

// This function is impelmented for compatibility with operatortest
// but is not meant to be used directly
func (h *mockOperatorConfig) Build(*zap.SugaredLogger) (operator.Operator, error) {
func (h *mockOperatorConfig) Build(_ *component.TelemetrySettings) (operator.Operator, error) {
panic("not impelemented")
}
3 changes: 2 additions & 1 deletion pkg/stanza/fileconsumer/internal/header/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"regexp"

"go.opentelemetry.io/collector/component"
"go.uber.org/zap"
"golang.org/x/text/encoding"

Expand Down Expand Up @@ -39,7 +40,7 @@ func NewConfig(matchRegex string, metadataOperators []operator.Config, enc encod
p, err := pipeline.Config{
Operators: metadataOperators,
DefaultOutput: newPipelineOutput(nopLogger),
}.Build(nopLogger)
}.Build(&component.TelemetrySettings{Logger: nopLogger.Desugar()})

if err != nil {
return nil, fmt.Errorf("failed to build pipelines: %w", err)
Expand Down
3 changes: 2 additions & 1 deletion pkg/stanza/fileconsumer/internal/header/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension/experimental/storage"
"go.uber.org/zap"

Expand All @@ -31,7 +32,7 @@ func NewReader(logger *zap.SugaredLogger, cfg Config) (*Reader, error) {
r.pipeline, err = pipeline.Config{
Operators: cfg.metadataOperators,
DefaultOutput: r.output,
}.Build(logger)
}.Build(&component.TelemetrySettings{Logger: logger.Desugar()})
if err != nil {
return nil, fmt.Errorf("failed to build pipeline: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/stanza/fileconsumer/internal/header/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,6 @@ func TestSkipUnmatchedHeaderLine(t *testing.T) {
}

func TestNewReaderErr(t *testing.T) {
_, err := NewReader(nil, Config{})
_, err := NewReader(zaptest.NewLogger(t).Sugar(), Config{})
assert.Error(t, err)
}
5 changes: 3 additions & 2 deletions pkg/stanza/fileconsumer/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"testing"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/emittest"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil"
)

func testManager(t *testing.T, cfg *Config, opts ...Option) (*Manager, *emittest.Sink) {
Expand All @@ -18,7 +18,8 @@ func testManager(t *testing.T, cfg *Config, opts ...Option) (*Manager, *emittest
}

func testManagerWithSink(t *testing.T, cfg *Config, sink *emittest.Sink, opts ...Option) *Manager {
input, err := cfg.Build(testutil.Logger(t), sink.Callback, opts...)
set := componenttest.NewNopTelemetrySettings()
input, err := cfg.Build(&set, sink.Callback, opts...)
require.NoError(t, err)
t.Cleanup(func() { input.tracker.ClosePreviousFiles() })
return input
Expand Down
4 changes: 2 additions & 2 deletions pkg/stanza/operator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"encoding/json"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
"go.uber.org/zap"
)

// Config is the configuration of an operator
Expand All @@ -25,7 +25,7 @@ func NewConfig(b Builder) Config {
type Builder interface {
ID() string
Type() string
Build(*zap.SugaredLogger) (Operator, error)
Build(*component.TelemetrySettings) (Operator, error)
SetID(string)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/stanza/operator/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"testing"

"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.opentelemetry.io/collector/component"
yaml "gopkg.in/yaml.v2"
)

Expand All @@ -18,7 +18,7 @@ type FakeBuilder struct {
Array []string `json:"array" yaml:"array"`
}

func (f *FakeBuilder) Build(_ *zap.SugaredLogger) (Operator, error) {
func (f *FakeBuilder) Build(_ *component.TelemetrySettings) (Operator, error) {
return nil, nil
}
func (f *FakeBuilder) ID() string { return "operator" }
Expand Down
4 changes: 2 additions & 2 deletions pkg/stanza/operator/helper/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
package helper

import (
"go.uber.org/zap"
"go.opentelemetry.io/collector/component"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
)
Expand Down Expand Up @@ -35,6 +35,6 @@ func newHelpersConfig() *helpersConfig {

// This function is impelmented for compatibility with operatortest
// but is not meant to be used directly
func (h *helpersConfig) Build(*zap.SugaredLogger) (operator.Operator, error) {
func (h *helpersConfig) Build(_ *component.TelemetrySettings) (operator.Operator, error) {
panic("not impelemented")
}
6 changes: 3 additions & 3 deletions pkg/stanza/operator/helper/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package helper // import "github.com/open-telemetry/opentelemetry-collector-cont
import (
"context"

"go.uber.org/zap"
"go.opentelemetry.io/collector/component"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/errors"
Expand All @@ -29,8 +29,8 @@ type InputConfig struct {
}

// Build will build a base producer.
func (c InputConfig) Build(logger *zap.SugaredLogger) (InputOperator, error) {
writerOperator, err := c.WriterConfig.Build(logger)
func (c InputConfig) Build(set *component.TelemetrySettings) (InputOperator, error) {
writerOperator, err := c.WriterConfig.Build(set)
if err != nil {
return InputOperator{}, errors.WithDetails(err, "operator_id", c.ID())
}
Expand Down
Loading

0 comments on commit 6cefabc

Please sign in to comment.