Skip to content

Commit

Permalink
[chore] [exporterhelper] Move shutdown error from queue package (open…
Browse files Browse the repository at this point in the history
…-telemetry#9554)

The error is created by the retry sender and used by the queue sender.
It doesn't belong to queue package
  • Loading branch information
dmitryax authored Feb 14, 2024
1 parent d455bff commit cc88aee
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 6 deletions.
4 changes: 2 additions & 2 deletions exporter/exporterhelper/retry_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/internal/queue"
"go.opentelemetry.io/collector/exporter/internal/experr"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
)

Expand Down Expand Up @@ -127,7 +127,7 @@ func (rs *retrySender) send(ctx context.Context, req Request) error {
case <-ctx.Done():
return fmt.Errorf("request is cancelled or timed out %w", err)
case <-rs.stopCh:
return queue.NewShutdownErr(err)
return experr.NewShutdownErr(err)
case <-time.After(backoffDelay):
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package queue // import "go.opentelemetry.io/collector/exporter/internal/queue"
package experr // import "go.opentelemetry.io/collector/exporter/internal/experr"

import "errors"

type shutdownErr struct {
err error
Expand All @@ -18,3 +20,8 @@ func (s shutdownErr) Error() string {
func (s shutdownErr) Unwrap() error {
return s.err
}

func IsShutdownErr(err error) bool {
var sdErr shutdownErr
return errors.As(err, &sdErr)
}
24 changes: 24 additions & 0 deletions exporter/internal/experr/err_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package experr

import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestNewShutdownErr(t *testing.T) {
err := NewShutdownErr(errors.New("some error"))
assert.Equal(t, "interrupted due to shutdown: some error", err.Error())
}

func TestIsShutdownErr(t *testing.T) {
err := errors.New("testError")
require.False(t, IsShutdownErr(err))
err = NewShutdownErr(err)
require.True(t, IsShutdownErr(err))
}
3 changes: 2 additions & 1 deletion exporter/internal/queue/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/internal/experr"
"go.opentelemetry.io/collector/extension/experimental/storage"
)

Expand Down Expand Up @@ -360,7 +361,7 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error),
pq.mu.Unlock()
}()

if errors.As(consumeErr, &shutdownErr{}) {
if experr.IsShutdownErr(consumeErr) {
// The queue is shutting down, don't mark the item as dispatched, so it's picked up again after restart.
// TODO: Handle partially delivered requests by updating their values in the storage.
return
Expand Down
5 changes: 3 additions & 2 deletions exporter/internal/queue/persistent_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/exporter/internal/experr"
"go.opentelemetry.io/collector/extension/experimental/storage"
"go.opentelemetry.io/collector/extension/extensiontest"
"go.opentelemetry.io/collector/pdata/pcommon"
Expand Down Expand Up @@ -411,7 +412,7 @@ func TestPersistentQueue_CorruptedData(t *testing.T) {
}
assert.Equal(t, 3, ps.Size())
require.True(t, ps.Consume(func(context.Context, tracesRequest) error {
return NewShutdownErr(nil)
return experr.NewShutdownErr(nil)
}))
assert.Equal(t, 2, ps.Size())

Expand Down Expand Up @@ -523,7 +524,7 @@ func TestPersistentQueueStartWithNonDispatched(t *testing.T) {
// put one more item in
require.NoError(t, ps.Offer(context.Background(), req))
require.Equal(t, 5, ps.Size())
return NewShutdownErr(nil)
return experr.NewShutdownErr(nil)
}))
assert.NoError(t, ps.Shutdown(context.Background()))

Expand Down

0 comments on commit cc88aee

Please sign in to comment.