Skip to content

Commit

Permalink
(exporterhelper) Retry sender: fail request if context timeout < next…
Browse files Browse the repository at this point in the history
… retry (open-telemetry#11331)

#### Description
The retry sender will delay until the context is canceled, where instead
it could fail fast with a transient error and a clear message that no
more retries are possible given the configuration.

#### Link to tracking issue
Part of open-telemetry#11183 

#### Testing
One new test.
  • Loading branch information
jmacd authored and jackgopack4 committed Oct 8, 2024
1 parent e959731 commit 4301658
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 1 deletion.
25 changes: 25 additions & 0 deletions .chloggen/retry_sender_fail_fast.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Retry sender will fail fast when the context timeout is shorter than the next retry interval.

# One or more tracking issues or pull requests related to the change
issues: [11183]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
6 changes: 6 additions & 0 deletions exporter/exporterhelper/internal/retry_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ func (rs *retrySender) Send(ctx context.Context, req internal.Request) error {
backoffDelay = max(backoffDelay, throttleErr.delay)
}

if deadline, has := ctx.Deadline(); has && time.Until(deadline) < backoffDelay {
// The delay is longer than the deadline. There is no point in
// waiting for cancelation.
return fmt.Errorf("request will be cancelled before next retry: %w", err)
}

backoffDelayStr := backoffDelay.String()
span.AddEvent(
"Exporting failed. Will retry the request after interval.",
Expand Down
67 changes: 66 additions & 1 deletion exporter/exporterhelper/internal/retry_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,12 +269,77 @@ func TestQueueRetryWithDisabledRetires(t *testing.T) {
require.NoError(t, be.Shutdown(context.Background()))
}

type mockErrorRequest struct{}
func TestRetryWithContextTimeout(t *testing.T) {
const testTimeout = 10 * time.Second

rCfg := configretry.NewDefaultBackOffConfig()
rCfg.Enabled = true

// First attempt after 100ms is attempted
rCfg.InitialInterval = 100 * time.Millisecond
rCfg.RandomizationFactor = 0
// Second attempt is at twice the testTimeout
rCfg.Multiplier = float64(2 * testTimeout / rCfg.InitialInterval)
qCfg := exporterqueue.NewDefaultConfig()
qCfg.Enabled = false
set := exportertest.NewNopSettings()
logger, observed := observer.New(zap.InfoLevel)
set.Logger = zap.New(logger)
be, err := NewBaseExporter(
set,
pipeline.SignalLogs,
newObservabilityConsumerSender,
WithRetry(rCfg),
WithRequestQueue(qCfg, exporterqueue.NewMemoryQueueFactory[internal.Request]()),
)
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
ocs := be.ObsrepSender.(*observabilityConsumerSender)
mockR := newErrorRequest()

start := time.Now()
ocs.run(func() {
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
err := be.Send(ctx, mockR)
require.Error(t, err)
require.Equal(t, "request will be cancelled before next retry: transient error", err.Error())
})
assert.Len(t, observed.All(), 2)
assert.Equal(t, "Exporting failed. Will retry the request after interval.", observed.All()[0].Message)
assert.Equal(t, "Exporting failed. Rejecting data. "+
"Try enabling sending_queue to survive temporary failures.", observed.All()[1].Message)
ocs.awaitAsyncProcessing()
ocs.checkDroppedItemsCount(t, 7)
require.Equal(t, 2, mockR.(*mockErrorRequest).getNumRequests())
require.NoError(t, be.Shutdown(context.Background()))

// There should be no delay, because the initial interval is
// longer than the context timeout. Merely checking that no
// delays on the order of either the context timeout or the
// retry interval were introduced, i.e., fail fast.
elapsed := time.Since(start)
require.Less(t, elapsed, testTimeout/2)
}

type mockErrorRequest struct {
mu sync.Mutex
requests int
}

func (mer *mockErrorRequest) Export(context.Context) error {
mer.mu.Lock()
defer mer.mu.Unlock()
mer.requests++
return errors.New("transient error")
}

func (mer *mockErrorRequest) getNumRequests() int {
mer.mu.Lock()
defer mer.mu.Unlock()
return mer.requests
}

func (mer *mockErrorRequest) OnError(error) internal.Request {
return mer
}
Expand Down

0 comments on commit 4301658

Please sign in to comment.