Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
jackgopack4 authored Jul 23, 2024
2 parents 1adde1d + 4e44e32 commit 36fc7e6
Show file tree
Hide file tree
Showing 20 changed files with 185 additions and 240 deletions.
18 changes: 18 additions & 0 deletions .chloggen/componenttest-extra-attributes.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: component/componenttest

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add optional ...attribute.KeyValue argument to TestTelemetry.CheckExporterMetricGauge.

# One or more tracking issues or pull requests related to the change
issues: [10593]

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
20 changes: 20 additions & 0 deletions .chloggen/exporterhelper-report-data-type-in-queue-metrics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add data_type attribute to `otelcol_exporter_queue_size` metric to report the type of data being processed.

# One or more tracking issues or pull requests related to the change
issues: [9943]

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
25 changes: 25 additions & 0 deletions .chloggen/service-remove-ballast-deps-2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: processor/memorylimiter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: The memory limiter processor will no longer account for ballast size.

# One or more tracking issues or pull requests related to the change
issues: [10696]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: If you are already using GOMEMLIMIT instead of the ballast extension this does not affect you.

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
25 changes: 25 additions & 0 deletions .chloggen/service-remove-ballast-deps-3.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: extension/memorylimiter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: The memory limiter extension will no longer account for ballast size.

# One or more tracking issues or pull requests related to the change
issues: [10696]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: If you are already using GOMEMLIMIT instead of the ballast extension this does not affect you.

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
25 changes: 25 additions & 0 deletions .chloggen/service-remove-ballast-deps.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: service

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: The service will no longer be able to get a ballast size from the deprecated ballast extension.

# One or more tracking issues or pull requests related to the change
issues: [10696]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: If you are already using GOMEMLIMIT instead of the ballast extension this does not affect you.

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
7 changes: 5 additions & 2 deletions component/componenttest/obsreporttest.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.opentelemetry.io/otel/attribute"
otelprom "go.opentelemetry.io/otel/exporters/prometheus"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
Expand Down Expand Up @@ -72,8 +73,10 @@ func (tts *TestTelemetry) CheckExporterLogs(sentLogRecords, sendFailedLogRecords
return tts.prometheusChecker.checkExporterLogs(tts.id, sentLogRecords, sendFailedLogRecords)
}

func (tts *TestTelemetry) CheckExporterMetricGauge(metric string, val int64) error {
return tts.prometheusChecker.checkExporterMetricGauge(tts.id, metric, val)
func (tts *TestTelemetry) CheckExporterMetricGauge(metric string, val int64, extraAttrs ...attribute.KeyValue) error {
attrs := attributesForExporterMetrics(tts.id)
attrs = append(attrs, extraAttrs...)
return tts.prometheusChecker.checkGauge(metric, val, attrs)
}

// CheckProcessorTraces checks that for the current exported values for trace exporter metrics match given values.
Expand Down
6 changes: 2 additions & 4 deletions component/componenttest/otelprometheuschecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,8 @@ func (pc *prometheusChecker) checkExporterEnqueueFailed(exporter component.ID, d
return pc.checkCounter(fmt.Sprintf("exporter_enqueue_failed_%s", datatype), enqueueFailed, exporterAttrs)
}

func (pc *prometheusChecker) checkExporterMetricGauge(exporter component.ID, metric string, val int64) error {
exporterAttrs := attributesForExporterMetrics(exporter)

ts, err := pc.getMetric(metric, io_prometheus_client.MetricType_GAUGE, exporterAttrs)
func (pc *prometheusChecker) checkGauge(metric string, val int64, attrs []attribute.KeyValue) error {
ts, err := pc.getMetric(metric, io_prometheus_client.MetricType_GAUGE, attrs)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func WithQueue(config QueueSettings) Option {
NumConsumers: config.NumConsumers,
QueueSize: config.QueueSize,
})
o.queueSender = newQueueSender(q, o.set, config.NumConsumers, o.exportFailureMessage, o.obsrep.telemetryBuilder)
o.queueSender = newQueueSender(q, o.set, config.NumConsumers, o.exportFailureMessage, o.obsrep)
return nil
}
}
Expand All @@ -132,7 +132,7 @@ func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Facto
DataType: o.signal,
ExporterSettings: o.set,
}
o.queueSender = newQueueSender(queueFactory(context.Background(), set, cfg), o.set, cfg.NumConsumers, o.exportFailureMessage, o.obsrep.telemetryBuilder)
o.queueSender = newQueueSender(queueFactory(context.Background(), set, cfg), o.set, cfg.NumConsumers, o.exportFailureMessage, o.obsrep)
return nil
}
}
Expand Down Expand Up @@ -250,7 +250,7 @@ type baseExporter struct {
}

func newBaseExporter(set exporter.Settings, signal component.DataType, osf obsrepSenderFactory, options ...Option) (*baseExporter, error) {
obsReport, err := NewObsReport(ObsReportSettings{ExporterID: set.ID, ExporterCreateSettings: set})
obsReport, err := NewObsReport(ObsReportSettings{ExporterID: set.ID, ExporterCreateSettings: set, DataType: signal})
if err != nil {
return nil, err
}
Expand Down
4 changes: 3 additions & 1 deletion exporter/exporterhelper/obsexporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type ObsReport struct {
level configtelemetry.Level
spanNamePrefix string
tracer trace.Tracer
dataType component.DataType

otelAttrs []attribute.KeyValue
telemetryBuilder *metadata.TelemetryBuilder
Expand All @@ -38,6 +39,7 @@ type ObsReport struct {
type ObsReportSettings struct {
ExporterID component.ID
ExporterCreateSettings exporter.Settings
DataType component.DataType
}

// NewObsReport creates a new Exporter.
Expand All @@ -58,7 +60,7 @@ func newExporter(cfg ObsReportSettings) (*ObsReport, error) {
level: cfg.ExporterCreateSettings.TelemetrySettings.MetricsLevel,
spanNamePrefix: obsmetrics.ExporterPrefix + cfg.ExporterID.String(),
tracer: cfg.ExporterCreateSettings.TracerProvider.Tracer(cfg.ExporterID.String()),

dataType: cfg.DataType,
otelAttrs: []attribute.KeyValue{
attribute.String(obsmetrics.ExporterKey, cfg.ExporterID.String()),
},
Expand Down
1 change: 0 additions & 1 deletion exporter/exporterhelper/obsreport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
)

func TestExportEnqueueFailure(t *testing.T) {
exporterID := component.MustNewID("fakeExporter")
tt, err := componenttest.SetupTelemetry(exporterID)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
Expand Down
25 changes: 13 additions & 12 deletions exporter/exporterhelper/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata"
"go.opentelemetry.io/collector/exporter/exporterqueue"
"go.opentelemetry.io/collector/exporter/internal/queue"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
Expand Down Expand Up @@ -74,18 +73,18 @@ type queueSender struct {
traceAttribute attribute.KeyValue
consumers *queue.Consumers[Request]

telemetryBuilder *metadata.TelemetryBuilder
exporterID component.ID
obsrep *ObsReport
exporterID component.ID
}

func newQueueSender(q exporterqueue.Queue[Request], set exporter.Settings, numConsumers int,
exportFailureMessage string, telemetryBuilder *metadata.TelemetryBuilder) *queueSender {
exportFailureMessage string, obsrep *ObsReport) *queueSender {
qs := &queueSender{
queue: q,
numConsumers: numConsumers,
traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()),
telemetryBuilder: telemetryBuilder,
exporterID: set.ID,
queue: q,
numConsumers: numConsumers,
traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()),
obsrep: obsrep,
exporterID: set.ID,
}
consumeFunc := func(ctx context.Context, req Request) error {
err := qs.nextSender.send(ctx, req)
Expand All @@ -105,10 +104,12 @@ func (qs *queueSender) Start(ctx context.Context, host component.Host) error {
return err
}

opts := metric.WithAttributeSet(attribute.NewSet(attribute.String(obsmetrics.ExporterKey, qs.exporterID.String())))
dataTypeAttr := attribute.String(obsmetrics.DataTypeKey, qs.obsrep.dataType.String())
return multierr.Append(
qs.telemetryBuilder.InitExporterQueueSize(func() int64 { return int64(qs.queue.Size()) }, opts),
qs.telemetryBuilder.InitExporterQueueCapacity(func() int64 { return int64(qs.queue.Capacity()) }, opts),
qs.obsrep.telemetryBuilder.InitExporterQueueSize(func() int64 { return int64(qs.queue.Size()) },
metric.WithAttributeSet(attribute.NewSet(qs.traceAttribute, dataTypeAttr))),
qs.obsrep.telemetryBuilder.InitExporterQueueCapacity(func() int64 { return int64(qs.queue.Capacity()) },
metric.WithAttributeSet(attribute.NewSet(qs.traceAttribute))),
)
}

Expand Down
53 changes: 31 additions & 22 deletions exporter/exporterhelper/queue_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,18 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata"
"go.opentelemetry.io/collector/exporter/exporterqueue"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/exporter/internal/queue"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
)

func TestQueuedRetry_StopWhileWaiting(t *testing.T) {
Expand Down Expand Up @@ -202,28 +203,33 @@ func TestQueuedRetryHappyPath(t *testing.T) {
})
}
}
func TestQueuedRetry_QueueMetricsReported(t *testing.T) {
tt, err := componenttest.SetupTelemetry(defaultID)
require.NoError(t, err)

qCfg := NewDefaultQueueSettings()
qCfg.NumConsumers = 0 // to make every request go straight to the queue
rCfg := configretry.NewDefaultBackOffConfig()
set := exporter.Settings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}
be, err := newBaseExporter(set, defaultDataType, newObservabilityConsumerSender,
withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})),
WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))

require.NoError(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_capacity", int64(defaultQueueSize)))
func TestQueuedRetry_QueueMetricsReported(t *testing.T) {
dataTypes := []component.DataType{component.DataTypeLogs, component.DataTypeTraces, component.DataTypeMetrics}
for _, dataType := range dataTypes {
tt, err := componenttest.SetupTelemetry(defaultID)
require.NoError(t, err)

qCfg := NewDefaultQueueSettings()
qCfg.NumConsumers = 0 // to make every request go straight to the queue
rCfg := configretry.NewDefaultBackOffConfig()
set := exporter.Settings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}
be, err := newBaseExporter(set, dataType, newObservabilityConsumerSender,
withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})),
WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))

require.NoError(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_capacity", int64(defaultQueueSize)))

for i := 0; i < 7; i++ {
require.NoError(t, be.send(context.Background(), newErrorRequest()))
}
require.NoError(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_size", int64(7),
attribute.String(obsmetrics.DataTypeKey, dataType.String())))

for i := 0; i < 7; i++ {
require.NoError(t, be.send(context.Background(), newErrorRequest()))
assert.NoError(t, be.Shutdown(context.Background()))
}
require.NoError(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_size", int64(7)))

assert.NoError(t, be.Shutdown(context.Background()))
}

func TestNoCancellationContext(t *testing.T) {
Expand Down Expand Up @@ -426,9 +432,12 @@ func TestQueuedRetryPersistentEnabled_NoDataLossOnShutdown(t *testing.T) {
func TestQueueSenderNoStartShutdown(t *testing.T) {
queue := queue.NewBoundedMemoryQueue[Request](queue.MemoryQueueSettings[Request]{})
set := exportertest.NewNopSettings()
builder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings)
obsrep, err := NewObsReport(ObsReportSettings{
ExporterID: exporterID,
ExporterCreateSettings: exportertest.NewNopSettings(),
})
assert.NoError(t, err)
qs := newQueueSender(queue, set, 1, "", builder)
qs := newQueueSender(queue, set, 1, "", obsrep)
assert.NoError(t, qs.Shutdown(context.Background()))
}

Expand Down
Loading

0 comments on commit 36fc7e6

Please sign in to comment.