diff --git a/exporter/exporterhelper/retry_sender.go b/exporter/exporterhelper/retry_sender.go index c6055df0673..6e8a36f9ef4 100644 --- a/exporter/exporterhelper/retry_sender.go +++ b/exporter/exporterhelper/retry_sender.go @@ -17,7 +17,7 @@ import ( "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/internal/queue" + "go.opentelemetry.io/collector/exporter/internal/experr" "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" ) @@ -127,7 +127,7 @@ func (rs *retrySender) send(ctx context.Context, req Request) error { case <-ctx.Done(): return fmt.Errorf("request is cancelled or timed out %w", err) case <-rs.stopCh: - return queue.NewShutdownErr(err) + return experr.NewShutdownErr(err) case <-time.After(backoffDelay): } } diff --git a/exporter/internal/queue/err.go b/exporter/internal/experr/err.go similarity index 63% rename from exporter/internal/queue/err.go rename to exporter/internal/experr/err.go index a3b30ac9604..6bff64b162d 100644 --- a/exporter/internal/queue/err.go +++ b/exporter/internal/experr/err.go @@ -1,7 +1,9 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" +package experr // import "go.opentelemetry.io/collector/exporter/internal/experr" + +import "errors" type shutdownErr struct { err error @@ -18,3 +20,8 @@ func (s shutdownErr) Error() string { func (s shutdownErr) Unwrap() error { return s.err } + +func IsShutdownErr(err error) bool { + var sdErr shutdownErr + return errors.As(err, &sdErr) +} diff --git a/exporter/internal/experr/err_test.go b/exporter/internal/experr/err_test.go new file mode 100644 index 00000000000..ac0580025e5 --- /dev/null +++ b/exporter/internal/experr/err_test.go @@ -0,0 +1,24 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package experr + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewShutdownErr(t *testing.T) { + err := NewShutdownErr(errors.New("some error")) + assert.Equal(t, "interrupted due to shutdown: some error", err.Error()) +} + +func TestIsShutdownErr(t *testing.T) { + err := errors.New("testError") + require.False(t, IsShutdownErr(err)) + err = NewShutdownErr(err) + require.True(t, IsShutdownErr(err)) +} diff --git a/exporter/internal/queue/persistent_queue.go b/exporter/internal/queue/persistent_queue.go index bdcbeb07641..b6e5d2be4dd 100644 --- a/exporter/internal/queue/persistent_queue.go +++ b/exporter/internal/queue/persistent_queue.go @@ -17,6 +17,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/internal/experr" "go.opentelemetry.io/collector/extension/experimental/storage" ) @@ -360,7 +361,7 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error), pq.mu.Unlock() }() - if errors.As(consumeErr, &shutdownErr{}) { + if experr.IsShutdownErr(consumeErr) { // The queue is shutting down, don't mark the item as dispatched, so it's picked up again after restart. // TODO: Handle partially delivered requests by updating their values in the storage. return diff --git a/exporter/internal/queue/persistent_queue_test.go b/exporter/internal/queue/persistent_queue_test.go index 3edc2be55b8..2b525fa792f 100644 --- a/exporter/internal/queue/persistent_queue_test.go +++ b/exporter/internal/queue/persistent_queue_test.go @@ -18,6 +18,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/exporter/internal/experr" "go.opentelemetry.io/collector/extension/experimental/storage" "go.opentelemetry.io/collector/extension/extensiontest" "go.opentelemetry.io/collector/pdata/pcommon" @@ -411,7 +412,7 @@ func TestPersistentQueue_CorruptedData(t *testing.T) { } assert.Equal(t, 3, ps.Size()) require.True(t, ps.Consume(func(context.Context, tracesRequest) error { - return NewShutdownErr(nil) + return experr.NewShutdownErr(nil) })) assert.Equal(t, 2, ps.Size()) @@ -523,7 +524,7 @@ func TestPersistentQueueStartWithNonDispatched(t *testing.T) { // put one more item in require.NoError(t, ps.Offer(context.Background(), req)) require.Equal(t, 5, ps.Size()) - return NewShutdownErr(nil) + return experr.NewShutdownErr(nil) })) assert.NoError(t, ps.Shutdown(context.Background()))